热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

死信|机时_RabbitMQ发布确认

篇首语:本文由编程笔记#小编为大家整理,主要介绍了RabbitMQ发布确认相关的知识,希望对你有一定的参考价值。在生产环境中由于一些不明原因,导致rabb

篇首语:本文由编程笔记#小编为大家整理,主要介绍了RabbitMQ发布确认相关的知识,希望对你有一定的参考价值。


在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况, RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢:


1 发布确认 springboot 版本


1.1 确认机制方案


1.2 代码架构图


1.3 配置文件

在配置文件当中需要添加

spring.rabbitmq.publisher-confirm-type=correlated

  • NONE:禁用发布确认模式,是默认值
  • CORRELATED:发布消息成功到交换器后会触发回调方法
  • SIMPLE:经测试有两种效果
    • 其一效果和 CORRELATED 值一样会触发回调方法
    • 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker

spring.rabbitmq.host=182.92.234.71
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated

1.4 添加配置类

@Configuration
public class ConfirmConfig
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
//声明业务 Exchange
@Bean("confirmExchange")
public DirectExchange confirmExchange()
return new DirectExchange(CONFIRM_EXCHANGE_NAME);

// 声明确认队列
@Bean("confirmQueue")
public Queue confirmQueue()
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();

// 声明确认队列绑定关系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange)
return BindingBuilder.bind(queue).to(exchange).with("key1");



1.5 消息生产者

@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;
//依赖注入 rabbitTemplate 之后再设置它的回调对象
@PostConstruct
public void init()
rabbitTemplate.setConfirmCallback(myCallBack);

@GetMapping("sendMessage/message")
public void sendMessage(@PathVariable String message)
//指定消息 id 为 1
CorrelationData correlationData1 = new CorrelationData("1");
String routingKey = "key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey,
correlationData1);
CorrelationData correlationData2 = new CorrelationData("2");
routingKey = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correl
ationData2);
log.info("发送消息内容:", message);



1.6 回调接口

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback
/**
* 交换机不管是否收到消息的一个回调方法
* CorrelationData:消息相关数据
* ack:交换机是否收到消息
*/

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
String id = correlationData != null ? correlationData.getId() : "";
if (ack)
log.info("交换机已经收到 id 为:的消息", id);
else
log.info("交换机还未收到 id 为:消息,由于原因:", id, cause);




1.7 消息消费者

@Component
@Slf4j
public class ConfirmConsumer
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues =CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message)
String msg=new String(message.getBody());
log.info("接受到队列 confirm.queue 消息:",msg);



1.8 结果分析

可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key1”,第二条消息的 RoutingKey 为”key2”,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。


2 回退消息


2.1 Mandatory 参数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息, 如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。 那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。


2.2 消息生产者代码

@Slf4j
@Component
public class MessageProducer implements
RabbitTemplate.ConfirmCallback,
RabbitTemplate.ReturnCallback
@Autowired
private RabbitTemplate rabbitTemplate;
//rabbitTemplate 注入之后就设置该值
@PostConstruct
private void init()
rabbitTemplate.setConfirmCallback(this);
/**
* true:交换机无法将消息进行路由时,会将该消息返回给生产者
* false:如果发现消息无法进行路由,则直接丢弃
*/

rabbitTemplate.setMandatory(true);
//设置回退消息交给谁处理
rabbitTemplate.setReturnCallback(this);

@GetMapping("sendMessage")
public void sendMessage(String message)
//让消息绑定一个 id 值
CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("confirm.exchange", "key1", message + "key1", correlationData1);
log.info("发送消息 id 为:内容为", correlationData1.getId(), message + "key1");
CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("confirm.exchange", "key2", message + "key2", correlationData2);
log.info("发送消息 id 为:内容为", correlationData2.getId(), message + "key2");

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
String id = correlationData != null ? correlationData.getId() : "";
if (ack)
log.info("交换机收到消息确认成功, id:", id);
else
log.error("消息 id:未成功投递到交换机,原因是:", id, cause);


@Override
public void returnedMessage(Message message, int replyCode, String replyText, String
exchange, String routingKey)
log.info("消息:被服务器退回,退回原因:, 交换机是:, 路由 key:",
new String(message.getBody()), replyText, exchange, routingKey);



2.3 结果分析


3 备份交换机


3.1 是什么?

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。

如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。


3.2 代码架构图


3.3 修改配置类

@Configuration
public class ConfirmConfig
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
public static final String BACKUP_QUEUE_NAME = "backup.queue";
public static final String WARNING_QUEUE_NAME = "warning.queue";
// 声明确认队列
@Bean("confirmQueue")
public Queue confirmQueue()
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();

//声明确认队列绑定关系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange)
return BindingBuilder.bind(queue).to(exchange).with("key1");

//声明备份 Exchange
@Bean("backupExchange")
public FanoutExchange backupExchange()
return new FanoutExchange(BACKUP_EXCHANGE_NAME);

//声明确认 Exchange 交换机的备份交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange()
ExchangeBuilder exchangeBuilder =
ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
//设置该交换机的备份交换机
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
return (DirectExchange) exchangeBuilder.build();

// 声明警告队列
@Bean("warningQueue")
public Queue warningQueue()
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();

// 声明报警队列绑定关系
@Bean
public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange
backupExchange)
return BindingBuilder.bind(queue).to(backupExchange);

// 声明备份队列
@Bean("backQueue")
public Queue backQueue()
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();

// 声明备份队列绑定关系
@Bean
public Binding backupBinding(@Qualifier("backQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange backupExchange)
return BindingBuilder.bind(queue).to(backupExchange);



3.4 报警消费者

@Component
@Slf4j
public class WarningConsumer
public static final String WARNING_QUEUE_NAME = "warning.queue";
@RabbitListener(queues = WARNING_QUEUE_NAME)public void receiveWarningMsg(Message message)
String msg = new String(message.getBody());
log.error("报警发现不可路由消息: ", msg);



3.5 测试注意事项

重新启动项目的时候需要把原来的 confirm.exchange 删除因为我们修改了其绑定属性,不然报以下错:


3.6 结果分析

mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高。


推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 如何使用 net.sf.extjwnl.data.Word 类及其代码示例详解 ... [详细]
  • 深入解析JWT的实现与应用
    本文深入探讨了JSON Web Token (JWT) 的实现机制及其应用场景。JWT 是一种基于 RFC 7519 标准的开放性认证协议,用于在各方之间安全地传输信息。文章详细分析了 JWT 的结构、生成和验证过程,并讨论了其在现代 Web 应用中的实际应用案例,为开发者提供了全面的理解和实践指导。 ... [详细]
  • 本文将介绍一种扩展的ASP.NET MVC三层架构框架,并通过使用StructureMap实现依赖注入,以降低代码间的耦合度。该方法不仅能够提高代码的可维护性和可测试性,还能增强系统的灵活性和扩展性。通过具体实践案例,详细阐述了如何在实际开发中有效应用这一技术。 ... [详细]
  • SpringBoot启动脚本详解:BAT文件应用与基础入门指南(SpringBoot系列第1篇)
    如果你还在为SSM框架的复杂搭建过程和繁琐的配置文件而烦恼,那么SpringBoot将是你的一大福音。作为SpringBoot系列的第一篇文章,本文详细介绍了如何使用BAT文件来启动SpringBoot应用,并提供了基础入门指南,帮助开发者快速上手,简化开发流程。 ... [详细]
  • PHP中元素的计量单位是什么? ... [详细]
  • 在Spring与Ibatis集成的环境中,通过Spring AOP配置事务管理至服务层。当在一个服务方法中引入自定义多线程时,发现事务管理功能失效。若不使用多线程,事务管理则能正常工作。本文深入分析了这一现象背后的潜在风险,并探讨了可能的解决方案,以确保事务一致性和线程安全。 ... [详细]
  • Spring框架入门指南:专为新手打造的详细学习笔记
    Spring框架是Java Web开发中广泛应用的轻量级应用框架,以其卓越的功能和出色的性能赢得了广大开发者的青睐。本文为初学者提供了详尽的学习指南,涵盖基础概念、核心组件及实际应用案例,帮助新手快速掌握Spring框架的核心技术与实践技巧。 ... [详细]
  • 在稀疏直接法视觉里程计中,通过优化特征点并采用基于光度误差最小化的灰度图像线性插值技术,提高了定位精度。该方法通过对空间点的非齐次和齐次表示进行处理,利用RGB-D传感器获取的3D坐标信息,在两帧图像之间实现精确匹配,有效减少了光度误差,提升了系统的鲁棒性和稳定性。 ... [详细]
  • 优化后的标题:在模型属性后立即声明Errors或BindingResult参数的重要性及最佳实践 ... [详细]
  • MySQL性能优化与调参指南【数据库管理】
    本文详细探讨了MySQL数据库的性能优化与参数调整技巧,旨在帮助数据库管理员和开发人员提升系统的运行效率。内容涵盖索引优化、查询优化、配置参数调整等方面,结合实际案例进行深入分析,提供实用的操作建议。此外,还介绍了常见的性能监控工具和方法,助力读者全面掌握MySQL性能优化的核心技能。 ... [详细]
  • 如何在Java中高效构建WebService
    本文介绍了如何利用XFire框架在Java中高效构建WebService。XFire是一个轻量级、高性能的Java SOAP框架,能够简化WebService的开发流程。通过结合MyEclipse集成开发环境,开发者可以更便捷地进行项目配置和代码编写,从而提高开发效率。此外,文章还详细探讨了XFire的关键特性和最佳实践,为读者提供了实用的参考。 ... [详细]
  • 本文深入探讨了在使用 Spring Cloud Feign 时遇到的 `java.lang.IllegalStateException` 异常,具体表现为方法体参数过多的问题。通过详细分析异常原因和代码结构,提出了有效的解决方案,帮助开发者更好地理解和处理这一常见问题。 ... [详细]
  • 如何构建基于Spring MVC框架的Java Web应用项目
    在构建基于Spring MVC框架的Java Web应用项目时,首先应创建一个新的动态Web项目。接着,需将必要的JAR包导入至WebContent/WEB-INF/lib目录下,确保包括Spring核心库及相关依赖。如遇缺失的JAR包,可向社区求助或通过Maven等工具自动下载。正确配置后,即可开始搭建应用结构与功能模块。 ... [详细]
  • Java新手求助:如何优雅地向心仪女生索要QQ联系方式(附代码示例与技巧)
    在端午节后的闲暇时光中,我无意间在技术社区里发现了一篇关于如何巧妙地向心仪女生索取QQ联系方式的文章,顿时感到精神焕发。这篇文章详细介绍了源自《啊哈!算法》的方法,不仅图文并茂,还提供了实用的代码示例和技巧,非常适合 Java 新手学习和参考。 ... [详细]
author-avatar
wangimcf
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有